Implement dimensional time slice crawler#6011
Conversation
| client.executePartition(state, buffer, acknowledgementSet); | ||
| } | ||
|
|
||
| private void createPartitionsFordimensionTypes(LeaderPartition leaderPartition, |
There was a problem hiding this comment.
nit: capitalize Dimension
| DimensionalTimeSliceLeaderProgressState leaderProgressState = | ||
| (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); | ||
| int remainingHours = leaderProgressState.getRemainingHours(); | ||
| Instant nowUtc = latestModifiedTime.truncatedTo(ChronoUnit.HOURS); |
There was a problem hiding this comment.
Why do we use latestModifiedTime (Instant.now) instead of leaderProgressState.getLastpollTime (ref)?
There was a problem hiding this comment.
I am actually confused on the original code. Why todayUTC is equal to getLastPollTime. Can we divide into it a bit.
I agree we should be consistent with TimeSliceCrawler if possible.
Instant initialDate = leaderProgressState.getLastPollTime();
Instant todayUtc = initialDate.truncatedTo(ChronoUnit.DAYS);
There was a problem hiding this comment.
Historical pull occurs only during the first run. Initially, lastPollTime is set to Instant.now() when the plugin starts, while latestModifiedTime is set to Instant.now() when the crawl begins. I previously used latestModifiedTime to accommodate Office 365's 7-day limit, but this approach isn't generic enough for all connectors. I'll refactor the crawler to consistently use lastPollTime and handle the 7-day limit specifically in the O365 connector.
There was a problem hiding this comment.
We don't need to truncate to hour boundaries (nowUTC) as it causes unnecessary data pulls. For example, if lastPollTime is 14:37:22, truncating to 14:00:00 means we'd pull an extra 37 minutes of data. I'll use lastPollTime directly to ensure we only retrieve data from the exact time the plugin started.
There was a problem hiding this comment.
Prefer to truncate for now to keep consistent with TimeSliceCrawler--it is better to truncate & pull in extra logs than have potential dataloss
There was a problem hiding this comment.
Discussed offline, will truncate to keep it consistent with TimeSliceCrawler
| } | ||
| } catch (Office365Exception e) { | ||
|
|
||
| log.error(NOISY, "{} error processing audit log: {}", |
There was a problem hiding this comment.
Adding logType to the error log would help, I guess?
There was a problem hiding this comment.
logType is already included in the outer catch block's error message, which covers the entire partition, so adding it to individual error logs would be redundant.
| Instant latestModifiedTime = Instant.now(); | ||
| double startCount = partitionsCreatedCounter.count(); | ||
|
|
||
| createPartitionsFordimensionTypes(leaderPartition, coordinator, latestModifiedTime, dimensionTypes); |
There was a problem hiding this comment.
nit. Naming is createPartitionsForDimensionTypes should be capitalize D
| DimensionalTimeSliceLeaderProgressState leaderProgressState = | ||
| (DimensionalTimeSliceLeaderProgressState) leaderPartition.getProgressState().get(); | ||
| int remainingHours = leaderProgressState.getRemainingHours(); | ||
| Instant nowUtc = latestModifiedTime.truncatedTo(ChronoUnit.HOURS); |
There was a problem hiding this comment.
I am actually confused on the original code. Why todayUTC is equal to getLastPollTime. Can we divide into it a bit.
I agree we should be consistent with TimeSliceCrawler if possible.
Instant initialDate = leaderProgressState.getLastPollTime();
Instant todayUtc = initialDate.truncatedTo(ChronoUnit.DAYS);
|
|
||
| return restTemplate.exchange( | ||
| url, | ||
| contentUri, |
There was a problem hiding this comment.
We are hitting the contentUri received from external source. For security, we should validate this url before hitting.
| throw new RuntimeException("Interrupted while waiting to retry time window", ie); | ||
| } | ||
| public AuditLogsResponse searchAuditLogs(final String logType, final Instant startTime, final Instant endTime, final String nextPageUri) { | ||
| try { |
There was a problem hiding this comment.
Can we do a null check for startTime & endTime? Example
aabf54e to
243fa1a
Compare
| service.initializeSubscriptions(); | ||
| office365Iterator.initialize(lastPollTime); | ||
| return office365Iterator; |
There was a problem hiding this comment.
Where are these steps taking place now?
There was a problem hiding this comment.
I didn't add this initialization before, as subscriptions were likely already set up for this tenant, allowing log fetching without explicit initialization. I'll now add it to Office365Source's start() method, which is a more appropriate place since it's a one-time setup that should happen when the plugin starts.
There was a problem hiding this comment.
Also, we no longer need the iterator pattern since we're now processing logs in time-based chunks rather than page by page.
Signed-off-by: Alekhya Parisha <aparisha@amazon.com>
243fa1a to
2f95fba
Compare
| return new Record<>(event); | ||
| } catch (JsonProcessingException e) { | ||
| // JSON parsing errors are non-retryable as they indicate malformed data | ||
| throw new Office365Exception("Failed to parse audit log: " + logId, e, false); |
There was a problem hiding this comment.
Will this cause infinite loop of processing?
There was a problem hiding this comment.
This is marked as non-retryable and swallowed / sent to pipeline DLQ (when available).
Signed-off-by: Alekhya Parisha aparisha@amazon.com
Description
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.